iT邦幫忙

2021 iThome 鐵人賽

DAY 13
0
Software Development

從零開始Reactive Programming- Spring系列 第 14

[Day 13] Reactive Programming - Reactor(Processors & Sinks)

  • 分享至 

  • xImage
  •  

前言

這個主題花了我好多的時間查資料,之前提到動態的產生publisher其實就有sink的概念,但文件上對於sink的描述不是很清楚,當然有可能是我資質駑鈍,找了一些其他資料後終於了解其實Sinks就是經過Reactor優化過後的Processors

Processor

在介紹Reactive Programming的時候有提到在Publisher & Subscriber之間有個Processor來扮演中間人轉換之類的角色,然而在Reactor中,這些功能大部分其實透過Publisher 的operator就能做到,而其中剩下比較特別的部分,就還是需要使用Processor來特別處理,Processor同樣身為Subscriber所以可以直接使用onNext, onComplete and onError (透過Subscriber interface),而這樣的行為是比較危險需要謹慎使用,所以Reactor在3.4.0的版本後完全棄用了Processor,取而代之的是Sinks

both abstract and concrete FluxProcessor and MonoProcessor are deprecated and slated for removal in 3.5.0

Sinks

Sinks就是Reactor優化過後的Processors,他是thread-safe也可以避免一些不符合Reactive 規範的設計,相較於Processor.onNext必須是同步的(synchronized),Sinks有提供兩種Api,tryEmit* API 會回傳EmitResultemit* API提供EmissionFailureHandler,讓你可以更容易的根據發送結果來設計你的api或是更容易的處理錯誤。
Sinks主要有三種:

  • Sinks.One:僅可以傳送一個資料,類似Mono
  • Sinks.Many:可以傳送多筆資料,類似Flux
  • Sinks.Empty:沒有資料,僅能傳送終結訊號(terminal signal),類似Mono.empty()
    除了這三種以外其實還有Sinks.unsafe(),其之下也有相對的Sinks.unsafe().one()、Sinks.unsafe().empty()、Sinks.unsafe().Many(),其實就是不保證thread-safe的版本,所以如果你能夠確保使用情境會是thread-safe的,使用Sinks.unsafe()可以相對增加效能。
    補充一下EmitResult的種類文件
    https://ithelp.ithome.com.tw/upload/images/20210927/20141418XWKy0RvNOX.png

Sinks.Many

Sinks.Many又分為三種

  • multicast:可以有多個訂閱者,每個訂閱者並不會都拿到全部且一樣的資料,而是只會取得訂閱後開始最新的。
  • unicast:透過buffer來處理backpressure,但代價就是只能有一個訂閱者。
  • replay:會快取(cache)所有資料來讓每一個訂閱者都能拿到全部且一樣的資料。

下面分別使用三種來轉為Flux,產生1~5的資料並且有兩個訂閱者。
multicast的部分可以看到subscribe2 只會有訂閱後的最新一筆資料而已,不會有之前的。

Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
multicastSink.emitNext(1, FAIL_FAST);
multicastSink.emitNext(2, FAIL_FAST);
multicastSink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
multicastSink.emitNext(3, FAIL_FAST);
multicastSink.emitNext(4, FAIL_FAST);
multicastSink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
multicastSink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
subscribe1 :5
subscribe2 :5
     */

unicast可以看到第一個訂閱者正常顯示,但當subscribe2 出現後隨即會出現錯誤。

Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
unicastSink.emitNext(1, FAIL_FAST);
unicastSink.emitNext(2, FAIL_FAST);
unicastSink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
unicastSink.emitNext(3, FAIL_FAST);
unicastSink.emitNext(4, FAIL_FAST);
unicastSink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
unicastSink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
reactor.core.Exceptions$ErrorCallbackNotImplemented:
java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber

     */

replay的subscribe2 一樣能取得前面的資料,所以最終兩個訂閱者拿到的資料都是全部且一樣的。

Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
replaySink.emitNext(3, FAIL_FAST);
replaySink.emitNext(4, FAIL_FAST);
replaySink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
replaySink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
subscribe2 :1
subscribe2 :2
subscribe2 :3
subscribe2 :4
subscribe1 :5
subscribe2 :5
     */

再補充說明三個種類建構的方法,

  • multicast()
    • onBackpressureBuffer()是第一個訂閱者訂閱之前的暫存,之後的訂閱者就只會收到最新的資料。
    • onBackpressureBuffer(int bufferSize, boolean autoCancel) 可以傳入buffer的大小,並且當所有訂閱者都取消訂閱後是否自動清除buffer。
    • directAllOrNothing() 只要有一個訂閱者變慢(無法消耗(consume更多的資料),則所有訂閱者都會停止,直到恢復正常。
    • directBestEffort() 相較於上者,只會停止推送給無法接受資料的訂閱者,其他則正常。
  • unicast()
    • onBackpressureBuffer() 這個buffer是用來存唯一訂閱者訂閱資料已經推送出去的資料,這樣才能確保訂閱者可以拿到全部,預設是沒有上限所以可能會有OOM的風險,Reactor也提供傳入自訂Queue來限制上限onBackpressureBuffer(Queue),超過的部分就會捨棄掉。
  • replay()
    • limit(int))因為replay會保存資料讓所有訂閱者都能接受到一樣的資料,limit會限制保存的數量。
    • all():所有資料都保存
    • limit(Duration):保存某個時間
    • limit(int, Duration):結合時間跟數量的限制
    • latest():只保存最後一筆
    • latestOrDefault(T):保存最後一筆或是預設值

Sinks.One

類似於Mono,只能有一個值,內含以下三個method,
  • emitValue(T value) 等於 onNext(value) + onComplete()
  • emitEmpty()  等於onComplete(),基本上就是Mono.empty()
  • emitError``(Throwable t) 等於onError(t)

Sinks.Empty

基本上就是Sinks.One,只是沒有emitValue(T value)。

結語

以上就是關於Reactor的 Sinks介紹,希望看完有基礎的了解,感覺Sinks是比較進階的使用方式,如果後續有找到使用情境會再補充。

資料來源


上一篇
[Day 12] Reactive Programming - Reactor(publishOn/subscribeOn)
下一篇
[Day 14] Reactive Programming -Reactor(COLD VS HOT) -PART 1
系列文
從零開始Reactive Programming- Spring32
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言